{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "binding-delta",
   "metadata": {
    "papermill": {
     "duration": 0.016304,
     "end_time": "2021-03-22T20:29:23.476444",
     "exception": false,
     "start_time": "2021-03-22T20:29:23.460140",
     "status": "completed"
    },
    "tags": []
   },
   "source": [
    "# spark-csv-to-parquet"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "63d16770-79ac-4255-a2d0-29412418bebf",
   "metadata": {},
   "source": [
    "Converts a CSV file with header to parquet using ApacheSpark"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "satellite-vegetation",
   "metadata": {
    "papermill": {
     "duration": 2.246299,
     "end_time": "2021-03-22T20:29:25.734626",
     "exception": false,
     "start_time": "2021-03-22T20:29:23.488327",
     "status": "completed"
    },
    "tags": []
   },
   "outputs": [],
   "source": [
    "%%bash\n",
    "export version=`python --version |awk '{print $2}' |awk -F\".\" '{print $1$2}'`\n",
    "\n",
    "echo $version\n",
    "\n",
    "if [ $version == '36' ] || [ $version == '37' ]; then\n",
    "    echo 'Starting installation...'\n",
    "    pip3 install pyspark==2.4.8 wget==3.2 pyspark2pmml==0.5.1 > install.log 2> install.log\n",
    "    if [ $? == 0 ]; then\n",
    "        echo 'Please <<RESTART YOUR KERNEL>> (Kernel->Restart Kernel and Clear All Outputs)'\n",
    "    else\n",
    "        echo 'Installation failed, please check log:'\n",
    "        cat install.log\n",
    "    fi\n",
    "elif [ $version == '38' ] || [ $version == '39' ]; then\n",
    "    pip3 install pyspark==3.1.2 wget==3.2 pyspark2pmml==0.5.1 > install.log 2> install.log\n",
    "    if [ $? == 0 ]; then\n",
    "        echo 'Please <<RESTART YOUR KERNEL>> (Kernel->Restart Kernel and Clear All Outputs)'\n",
    "    else\n",
    "        echo 'Installation failed, please check log:'\n",
    "        cat install.log\n",
    "    fi\n",
    "else\n",
    "    echo 'Currently only python 3.6, 3.7 , 3.8 and 3.9 are supported, in case you need a different version please open an issue at https://github.com/IBM/claimed/issues'\n",
    "    exit -1\n",
    "fi"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "beginning-wisdom",
   "metadata": {
    "papermill": {
     "duration": 0.164002,
     "end_time": "2021-03-22T20:29:25.951504",
     "exception": false,
     "start_time": "2021-03-22T20:29:25.787502",
     "status": "completed"
    },
    "tags": []
   },
   "outputs": [],
   "source": [
    "from pyspark import SparkContext, SparkConf\n",
    "from pyspark.sql import SparkSession\n",
    "import os\n",
    "import sys\n",
    "import logging\n",
    "import re"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "abstract-cambridge",
   "metadata": {
    "papermill": {
     "duration": 0.012801,
     "end_time": "2021-03-22T20:29:25.972462",
     "exception": false,
     "start_time": "2021-03-22T20:29:25.959661",
     "status": "completed"
    },
    "tags": []
   },
   "outputs": [],
   "source": [
    "# source path and file name (default: data.csv)\n",
    "data_csv = os.environ.get('data_csv', 'data.csv')\n",
    "\n",
    "# destination path and parquet file name (default: data.parquet)\n",
    "output_data_parquet = os.environ.get('output_data_parquet', 'data.parquet')\n",
    "\n",
    "# url of master (default: local mode)\n",
    "master = os.environ.get('master', \"local[*]\")\n",
    "\n",
    "# temporal data storage for local execution\n",
    "data_dir = os.environ.get('data_dir', '../../data/')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "7ff12eff-e0f6-4301-b9a6-8a61caa33435",
   "metadata": {},
   "outputs": [],
   "source": [
    "parameters = list(\n",
    "    map(lambda s: re.sub('$', '\"', s),\n",
    "        map(\n",
    "            lambda s: s.replace('=', '=\"'),\n",
    "            filter(\n",
    "                lambda s: s.find('=') > -1 and bool(re.match(r'[A-Za-z0-9_]*=[.\\/A-Za-z0-9]*', s)),\n",
    "                sys.argv\n",
    "            )\n",
    "    )))\n",
    "\n",
    "for parameter in parameters:\n",
    "    logging.warning('Parameter: ' + parameter)\n",
    "    exec(parameter)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "spatial-premiere",
   "metadata": {
    "papermill": {
     "duration": 0.013595,
     "end_time": "2021-03-22T20:29:25.991349",
     "exception": false,
     "start_time": "2021-03-22T20:29:25.977754",
     "status": "completed"
    },
    "tags": []
   },
   "outputs": [],
   "source": [
    "skip = False\n",
    "\n",
    "if os.path.exists(data_dir + output_data_parquet):\n",
    "    skip = True"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "welsh-grave",
   "metadata": {
    "papermill": {
     "duration": 4.178678,
     "end_time": "2021-03-22T20:29:30.176328",
     "exception": false,
     "start_time": "2021-03-22T20:29:25.997650",
     "status": "completed"
    },
    "tags": []
   },
   "outputs": [],
   "source": [
    "if not skip:\n",
    "    sc = SparkContext.getOrCreate(SparkConf().setMaster(master))\n",
    "    spark = SparkSession.builder.getOrCreate()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "adjacent-yemen",
   "metadata": {
    "papermill": {
     "duration": 4.693188,
     "end_time": "2021-03-22T20:29:34.875295",
     "exception": false,
     "start_time": "2021-03-22T20:29:30.182107",
     "status": "completed"
    },
    "tags": []
   },
   "outputs": [],
   "source": [
    "if not skip:\n",
    "    df = spark.read.option('header', 'true').csv(data_dir + data_csv)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "metropolitan-issue",
   "metadata": {
    "papermill": {
     "duration": 458.264571,
     "end_time": "2021-03-22T20:37:13.150144",
     "exception": false,
     "start_time": "2021-03-22T20:29:34.885573",
     "status": "completed"
    },
    "tags": []
   },
   "outputs": [],
   "source": [
    "if not skip:\n",
    "    df.write.parquet(data_dir + output_data_parquet)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.9.6"
  },
  "papermill": {
   "default_parameters": {},
   "duration": 470.538548,
   "end_time": "2021-03-22T20:37:13.369954",
   "environment_variables": {},
   "exception": null,
   "input_path": "/home/jovyan/work/examples/pipelines/pairs/component-library/transform/spark-csv-to-parquet.ipynb",
   "output_path": "/home/jovyan/work/examples/pipelines/pairs/component-library/transform/spark-csv-to-parquet.ipynb",
   "parameters": {},
   "start_time": "2021-03-22T20:29:22.831406",
   "version": "2.3.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
